AI项目实战(13)自主任务规划 Agent

《AI Agent 实战》系列 · 自主任务规划 Agent

Posted by Ryan on 2026-07-05
Estimated Reading Time 55 Minutes
Words 12.5k In Total
Viewed Times

主讲能力:任务规划、风险评估、反思、长期经验沉淀
业务场景:用户给出复杂模糊目标,Agent 自动拆解计划、识别风险、推进低风险事项、创建人工交接,并把复盘经验写入长期记忆。
对应代码backend/projects/p10_autonomous_task/


12.1 老板甩来的目标

周一早上,老板路过工位,丢下一句:"帮我准备下季度产品发布会。"然后人就走了。

你愣在原地。这句话信息量极大,步骤却为零。发布会在哪天?预算多少?要不要请媒体?场地谁定?邀请函谁发?哪些事你能自己拍板,哪些事必须回头请示老板?你不可能冲进老板办公室问每一个细节,也不可能闷头把所有事全干了——万一你替老板拍板了一个五十万的场地,那是要背锅的。

于是你本能地做了一件事:先在脑子里把这句模糊的"目标"拆成一件件能动手的事,再给每件事贴个标签——这件我自己干、这件得问老板、这件有风险先别动。干完之后,你还会在备忘录里记一笔:"下次发布会要提前两周锁场地,不然临时溢价。"下回再有类似活儿,你翻翻备忘录,轻车熟路。

这就是自主任务规划:把一个模糊目标,变成一份可执行的清单,并清清楚楚地划出"能自动推进"和"必须人工确认"的边界。本章要造的,正是这样一个 Agent。

前面九个项目都在围绕具体场景打转:客服、知识库、代码、数据、内容、个人助理……每个都是一把"专精的锤子"。而本章要做的,是把前面练过的所有本事攥成一拳——规划、反思、记忆、工具、风险评估、人工交接——这是全书的集大成,也是最后一战。

真实世界的任务往往不是"查订单 10001"这样一句话就能搞定,而是:

  • 帮我准备一次产品发布
  • 帮我规划一次技术迁移
  • 帮我组织一次线上活动
  • 帮我制定一个学习计划

这些任务的难点,根本不在工具调用本身,而在四件事上:目标模糊、步骤繁多、风险不定、处处要人确认。自主任务规划 Agent 的职责,就是把这团乱麻理成一份可执行计划,并牢牢守住一条红线——分清"能自动推进"和"必须人工确认"。

功能需求

  • 检索历史经验。
  • 拆解复杂目标为 5-8 个任务。
  • 评估每个任务风险等级。
  • 自动推进低风险任务。
  • 高风险任务创建人工交接单。
  • 任务完成后生成复盘并写入长期记忆。

12.2 画个样子:它该长啥样

回到开头那个员工。他接到"准备发布会"后,脑子里其实跑了一条流水线:先翻备忘录(有没有干过类似的活儿)→ 把目标拆成几摊事 → 给每摊事估个风险 → 自己干能干的 → 脏活累活高风险活儿写成请示单递给老板 → 干完写复盘存档。我们的 Agent 走的也是同一条路,只不过每一步都落成可观测的结构化动作:

%%{init: {'theme':'base','flowchart':{'useMaxWidth':true,'htmlLabels':true}}}%%
graph TD
    Goal["模糊目标"] --> Recall["检索历史经验"]
    Recall --> Plan["拆解任务"]
    Plan --> Risk["风险评估"]
    Risk -->|低风险| Exec["自动执行"]
    Risk -->|高风险| Handoff["人工交接"]
    Exec --> Reflect["反思复盘"]
    Handoff --> Reflect
    Reflect --> Memory["写入长期记忆"]
    Memory --> Report["最终报告"]

    classDef p fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#0d47a1
    classDef r fill:#ede7f6,stroke:#5e35b1,stroke-width:2px,color:#311b92
    classDef o fill:#e8f5e9,stroke:#43a047,stroke-width:2px,color:#1b5e20
    class Goal,Recall,Plan,Risk p
    class Exec,Handoff,Reflect r
    class Memory,Report o

蓝色是"想"——把模糊目标想清楚;紫色是"做"——但做之前先分清哪些该自己做、哪些得交出去;绿色是"长记性"——做完一次,就往长期记忆里存一笔。注意那条分叉:风险评估一出来,路就分成两条,低风险往下走自动执行,高风险往右拐进人工交接。这条分叉,是整章的灵魂。


12.3 拆开看:怎么造出来

12.3.1 任务模型

1
2
3
4
5
6
7
8
@dataclass
class TaskItem:
id: str
title: str
owner: str = "agent"
status: str = "todo"
risk: str = "low"
notes: str = ""

任务必须结构化,否则后续风险评估、自动执行、人工交接都无从谈起——就像你给自己列待办,光写"搞发布会"三个字,下一秒就不知道该干嘛了。owner 说清这件事归谁,risk 说清这件事能不能乱动,status 说清这件事到哪一步了。三个字段,把一团模糊钉成一张可操作的工单。

12.3.2 风险边界

生产级自主 Agent 的核心原则只有一句:能自动化的,不等于应该自动执行的

这句话值得反复咀嚼。一个 Agent 能调用发邮件的接口、能改数据库、能下单付款——技术上它"能"。但"能"和"该"之间,隔着一道叫"后果"的墙。发错一封群邮件、改错一行生产数据、多刷一笔款子,这些都是不可逆、有外部影响、真金白银的坑。所以本章用一套规则,给每个任务上一道闸:

风险 行为
low 可自动执行
medium 可给建议,但需补充信息
high 创建人工交接,不自动执行

💡 顿悟时刻:很多人第一次做自主 Agent,最容易踩的坑就是"既然它能干,那就让它干呗"。但生产环境里,自主权越大,闯祸的能力也越大。真正成熟的 Agent 不是"什么都能自己干"的 Agent,而是"知道自己不该干什么"的 Agent。把高风险动作强制交接给人工,不是 Agent 能力弱,恰恰是它最懂事的体现。

⚠️ 避坑:风险分级不是写死在 Prompt 里就万事大吉——Prompt 是"建议",Agent 心情好可能听,心情不好可能不听。所以本章的闸门是两道的:Prompt 里写"高风险必须交接",代码里 needs_human_confirmationrisk == high 时硬性拦截执行。Prompt 管"该不该",代码管"能不能",两道闸门一起守,才守得住边界。

12.3.3 记忆沉淀

每次任务结束后,Agent 调用 remember_lesson 写入长期经验。下一次类似任务开始时,先调用 recall_lessons 把旧经验翻出来。

这一步,乍看平淡,其实是整章最反直觉的设计。有人会问:Agent 每次重新生成计划不就行了,费这劲存历史干嘛?

答案藏在"为什么老员工比新员工靠谱"里。新员工每次接活都从零开始推理,凭的是通用经验;老员工接活先翻自己的备忘录——“上次干这事栽在场地没提前锁,这次第一件事就是锁场地”。这一翻,省下的不是几秒钟,而是整个踩坑循环。反思复盘之所以要写入长期记忆,就是因为一次反思的价值不在当下这个任务(它已经做完了),而在下一个相似任务——只有存下来,经验才能跨越会话累积,Agent 才会"越用越懂你、越用越专业"。不写进记忆的反思,就像写完日记就扔进碎纸机,复盘等于没做。


12.4 动手写:三层架构完整代码

本节给出自主任务规划 Agent 的完整代码实现。代码按"模型层 / 提示词层 / 工具层 / 记忆层 / 服务层 / 项目层 / 入口"分层组织,每一层职责单一、互相解耦,最终由服务层编排成"规划 → 执行 → 反思"闭环。

读这段代码有个窍门:先别一头扎进某个工具的细节,先从服务层 AutonomousTaskService.run_full_workflow 往下看。它是这场戏的"导演",四个阶段一气呵成;其余每层都是被它调度的"演员"。导演怎么排戏,决定了整出戏怎么走。

12.4.1 三层架构完整代码

层次 文件 职责
模型层 models.py 任务状态机、风险等级、经验/反思模型
提示词层 prompts.py 自主规划系统提示词、拆解/评估/反思模板
工具层 tools.py 检索/拆解/评估/执行/交接/沉淀工具
记忆层 memory.py 经验沉淀记忆系统(SQLite)
服务层 service.py 规划-执行-反思闭环编排
项目层 project.py 项目注册、对外接口
入口 init.py 注册与 re-export

models.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
"""项目十:数据模型层。

定义自主任务规划 Agent 的领域模型和数据结构。
"""
from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, TypedDict


class TaskStatus(str, Enum):
"""任务状态枚举。"""
TODO = "todo" # 待办
IN_PROGRESS = "in_progress" # 执行中
COMPLETED = "completed" # 已完成
BLOCKED = "blocked" # 阻塞
HANDED_OFF = "handed_off" # 已交接给人工


class TaskRisk(str, Enum):
"""任务风险等级。"""
LOW = "low" # 低风险:可自动执行
MEDIUM = "medium" # 中风险:需补充信息
HIGH = "high" # 高风险:需人工确认


class TaskOwner(str, Enum):
"""任务执行者。"""
AGENT = "agent" # 由 Agent 执行
HUMAN = "human" # 需人工执行
BOTH = "both" # 需要协作


@dataclass
class TaskItem:
"""任务项模型。"""
id: str
title: str
owner: TaskOwner = TaskOwner.AGENT
status: TaskStatus = TaskStatus.TODO
risk: TaskRisk = TaskRisk.LOW
notes: str = ""
dependencies: list[str] = field(default_factory=list)
estimated_hours: float = 0.0
created_at: str = field(default_factory=lambda: datetime.now().isoformat())

def to_dict(self) -> dict[str, Any]:
"""转换为字典。"""
return {
"id": self.id,
"title": self.title,
"owner": self.owner.value,
"status": self.status.value,
"risk": self.risk.value,
"notes": self.notes,
"dependencies": self.dependencies,
"estimated_hours": self.estimated_hours,
}

def is_safe_to_execute(self) -> bool:
"""判断是否可以安全执行。

Returns:
是否可以自动执行
"""
return (
self.owner == TaskOwner.AGENT
and self.risk == TaskRisk.LOW
and self.status == TaskStatus.TODO
)

def needs_human_confirmation(self) -> bool:
"""判断是否需要人工确认。

Returns:
是否需要人工确认
"""
return self.risk == TaskRisk.HIGH or self.owner == TaskOwner.HUMAN


@dataclass
class RiskAssessment:
"""风险评估结果。"""
task_id: str
risk_level: TaskRisk
reasons: list[str] = field(default_factory=list)
mitigations: list[str] = field(default_factory=list)
recommendation: str = ""

def to_summary(self) -> str:
"""生成摘要。"""
risk_emoji = {
TaskRisk.LOW: "✅",
TaskRisk.MEDIUM: "🟡",
TaskRisk.HIGH: "⚠️",
}.get(self.risk_level, "❓")

lines = [f"{risk_emoji} 任务 {self.task_id} 风险等级: {self.risk_level.value}"]

if self.reasons:
lines.append(" 原因:")
for reason in self.reasons:
lines.append(f" - {reason}")

if self.mitigations:
lines.append(" 缓解措施:")
for mitigation in self.mitigations:
lines.append(f" - {mitigation}")

if self.recommendation:
lines.append(f" 建议: {self.recommendation}")

return "\n".join(lines)


@dataclass
class HandoffTicket:
"""人工交接单。"""
ticket_id: str
task_title: str
reason: str
priority: TaskRisk = TaskRisk.HIGH
context: str = ""
status: str = "pending" # pending, approved, rejected
created_at: str = field(default_factory=lambda: datetime.now().isoformat())

def to_summary(self) -> str:
"""生成摘要。"""
return (
f"🎫 交接单 #{self.ticket_id}\n"
f" 任务: {self.task_title}\n"
f" 原因: {self.reason}\n"
f" 优先级: {self.priority.value}\n"
f" 状态: {self.status}"
)


@dataclass
class Lesson:
"""经验教训模型。"""
goal: str
lesson: str
category: str = "general" # general, technical, process, communication
success: bool = True
created_at: str = field(default_factory=lambda: datetime.now().isoformat())

def to_summary(self) -> str:
"""生成摘要。"""
status = "✅ 成功经验" if self.success else "⚠️ 教训"
return f"{status} | 目标: {self.goal}\n经验: {self.lesson}"


@dataclass
class ReflectionResult:
"""反思结果。"""
what_went_well: list[str] = field(default_factory=list)
what_went_wrong: list[str] = field(default_factory=list)
lessons_learned: list[str] = field(default_factory=list)
improvement_actions: list[str] = field(default_factory=list)
overall_score: int = 5 # 1-10 分

def to_markdown(self) -> str:
"""转换为 Markdown 格式。"""
lines = [
"# 复盘报告",
"",
f"**整体评分**: {self.overall_score}/10",
"",
"## 做得好的",
"",
]

for item in self.what_went_well:
lines.append(f"- {item}")

lines.extend(["", "## 做得不好的", ""])
for item in self.what_went_wrong:
lines.append(f"- {item}")

lines.extend(["", "## 经验总结", ""])
for item in self.lessons_learned:
lines.append(f"- {item}")

lines.extend(["", "## 改进措施", ""])
for item in self.improvement_actions:
lines.append(f"- {item}")

return "\n".join(lines)


# LangGraph 状态定义
class PlannerState(TypedDict):
"""规划器状态(用于 LangGraph)。"""
goal: str
plan: list[dict[str, Any]]
execution_log: list[str]
reflection: str
final_report: str


@dataclass
class PlanningTask:
"""规划任务。"""
task_id: str
goal: str
status: TaskStatus = TaskStatus.TODO
plan: list[TaskItem] = field(default_factory=list)
risk_assessments: list[RiskAssessment] = field(default_factory=list)
handoffs: list[HandoffTicket] = field(default_factory=list)
execution_log: list[str] = field(default_factory=list)
reflection: ReflectionResult | None = None
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
completed_at: str = ""

def add_task_item(self, item: TaskItem) -> None:
"""添加任务项。"""
self.plan.append(item)

def get_safe_tasks(self) -> list[TaskItem]:
"""获取可以安全执行的任务。"""
return [t for t in self.plan if t.is_safe_to_execute()]

def get_high_risk_tasks(self) -> list[TaskItem]:
"""获取需要人工确认的高风险任务。"""
return [t for t in self.plan if t.needs_human_confirmation()]

def log(self, message: str) -> None:
"""记录执行日志。"""
timestamp = datetime.now().strftime("%H:%M:%S")
self.execution_log.append(f"[{timestamp}] {message}")

prompts.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""项目十:Prompt 层。

定义自主任务规划 Agent 的系统提示词。
"""
from __future__ import annotations

# 主 Agent 系统提示词
AUTONOMOUS_AGENT_PROMPT = """你是自主任务规划 Agent,擅长把模糊目标转成可执行计划。

## 核心能力
1. **任务拆解**:将复杂目标分解为 5-8 个可执行任务
2. **风险评估**:识别每个任务的风险等级和所需确认
3. **安全执行**:自动执行低风险任务,高风险任务交接给人工
4. **经验沉淀**:复盘任务过程,将经验写入长期记忆

## 工作闭环(必须严格遵循)
1. **回忆经验**:先调用 recall_lessons 检索相关历史经验
2. **任务拆解**:调用 decompose_goal 将目标拆成可执行任务
3. **风险评估**:调用 assess_risk 评估每个任务的风险
4. **安全执行**:低风险任务调用 execute_safe_task 推进
5. **人工交接**:高风险任务必须 create_handoff,不能擅自执行
6. **复盘沉淀**:任务完成后调用 remember_lesson 保存经验

## 风险分级标准
- **低风险 (low)**:可逆操作、无外部影响、不涉及费用 → 可自动执行
- **中风险 (medium)**:需要补充信息、有轻度外部影响 → 标注并建议
- **高风险 (high)**:涉及费用、权限变更、不可逆操作、外部影响大 → 必须人工确认

## 输出要求
用 Markdown 格式输出,包含:
1. **目标理解**:复述目标,明确成功标准
2. **任务拆解**:列出所有任务,标注 ID、负责人、风险
3. **风险评估**:每个任务的风险说明和建议
4. **执行结果**:已执行的任务和结果
5. **人工确认事项**:需要人工处理的事项
6. **复盘经验**:本次任务的经验总结

## 重要原则
- **不要声称已经完成无法真实执行的外部动作**
- 涉及费用、权限、安全的操作,必须明确标注需要人工确认
- 对不确定的任务,主动提问而不是猜测
- 每一步操作都有清晰的理由说明

现在,请开始处理用户的目标。"""


# 任务拆解提示词
DECOMPOSE_PROMPT = """你是任务规划专家。请将以下目标拆解为 5-8 个可执行的任务。

## 目标
{goal}

## 历史经验参考
{lessons}

## 拆解原则
1. **MECE 原则**:任务之间相互独立、完全穷尽
2. **可执行性**:每个任务都是具体可执行的,不是抽象概念
3. **有明确产出**:每个任务有明确的交付物
4. **标注风险**:根据任务性质标注风险等级
5. **标注负责人**:明确是 Agent 能做还是需要人工

## 任务字段说明
- id: 任务编号 (T1, T2, ...)
- title: 任务标题(动宾结构,如"梳理时间线")
- owner: 负责人 (agent/human/both)
- status: 状态 (todo)
- risk: 风险等级 (low/medium/high)
- notes: 备注(说明风险原因或执行要点)

## 输出格式
返回 JSON 数组,每个元素是一个任务对象。

请输出任务列表:"""


# 风险评估提示词
RISK_ASSESSMENT_PROMPT = """你是风险评估专家。请评估以下任务的风险等级。

## 待评估任务
{tasks_json}

## 风险评估维度
1. **可逆性**:操作是否可撤销?不可逆 = 高风险
2. **外部影响**:是否影响外部系统或人员?影响大 = 高风险
3. **费用涉及**:是否涉及金钱?涉及 = 高风险
4. **权限要求**:是否需要特殊权限?需要 = 中高风险
5. **信息充分性**:是否有足够信息执行?不足 = 中风险

## 输出要求
对每个任务,给出:
- 风险等级 (low/medium/high)
- 风险原因(1-2 句话)
- 缓解措施建议
- 是否建议人工确认

请输出评估结果:"""


# 执行提示词
EXECUTION_PROMPT = """你是任务执行专家。请执行以下低风险任务。

## 任务
{task_title}

## 任务背景
{context}

## 执行原则
1. 只执行低风险任务,高风险任务必须交接
2. 执行过程要有清晰的步骤和产出
3. 如果发现任务实际风险高于预期,立即停止并交接
4. 执行结果要具体、可验证

## 输出要求
- 执行步骤
- 执行结果
- 下一步建议

请执行任务:"""


# 反思提示词
REFLECTION_PROMPT = """你是项目复盘专家。请对以下任务执行过程进行反思总结。

## 任务目标
{goal}

## 执行日志
{execution_log}

## 完成的任务
{completed_tasks}

## 交接的任务
{handed_off_tasks}

## 复盘框架
1. **做得好的**:哪些方面执行顺利?为什么?
2. **做得不好的**:哪些方面有问题?根因是什么?
3. **经验总结**:从中学到什么可复用的经验?
4. **改进措施**:下次如何做得更好?
5. **整体评分**:1-10 分,说明理由

## 输出要求
- 客观真实,不掩饰问题
- 经验要具体可操作,能指导未来类似任务
- 改进措施要可执行

请输出复盘报告:"""


# 人工交接提示词
HANDOFF_PROMPT = """你需要为以下高风险任务创建人工交接单。

## 任务信息
- 标题: {task_title}
- 风险等级: {risk_level}
- 风险原因: {reason}

## 交接单要求
1. 清晰说明为什么需要人工确认
2. 提供足够的上下文信息
3. 明确期望人工做什么
4. 标注优先级和截止时间

请生成交接单内容:"""

tools.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
"""项目十:工具层。

定义自主任务规划 Agent 的所有工具函数。
"""
from __future__ import annotations

import json
from typing import Any

from langchain.tools import tool

from core.logging_conf import get_logger

from .memory import get_memory
from .models import HandoffTicket, RiskAssessment, TaskItem, TaskOwner, TaskRisk, TaskStatus

logger = get_logger("p10.autonomous_task.tools")


@tool
def recall_lessons(keyword: str) -> str:
"""检索历史任务经验。

在开始规划前调用,参考历史经验避免重复错误。

Args:
keyword: 搜索关键词,如目标相关词或任务类型

Returns:
相关历史经验列表
"""
logger.info("检索历史经验: %s", keyword)

memory = get_memory()
lessons = memory.recall_lessons(keyword, limit=5)

if not lessons:
return f"📚 暂无与 '{keyword}' 相关的历史经验。这是一个全新的挑战!"

lines = [f"📚 找到 {len(lessons)} 条相关历史经验:", ""]
for i, lesson in enumerate(lessons, 1):
status = "✅" if lesson.success else "⚠️"
lines.append(f"### 经验 {i} {status}")
lines.append(f"目标: {lesson.goal}")
lines.append(f"经验: {lesson.lesson}")
lines.append(f"类别: {lesson.category}")
lines.append("")

return "\n".join(lines)


@tool
def decompose_goal(goal: str) -> str:
"""将复杂目标拆解为 5-8 个可执行任务,返回 JSON 列表。

Args:
goal: 要拆解的目标描述

Returns:
JSON 格式的任务列表
"""
logger.info("拆解目标: %s", goal)

# 生产环境中该工具可接入项目管理系统/历史模板/LLM 生成
# 这里返回结构化示例,实际可由 Agent 调用 LLM 生成
tasks = [
TaskItem(
id="T1",
title="澄清目标和成功标准",
owner=TaskOwner.AGENT,
risk=TaskRisk.LOW,
notes="明确什么算成功",
),
TaskItem(
id="T2",
title="梳理关键里程碑和时间线",
owner=TaskOwner.AGENT,
risk=TaskRisk.LOW,
notes="时间节点要具体",
),
TaskItem(
id="T3",
title="识别依赖方和所需资源",
owner=TaskOwner.AGENT,
risk=TaskRisk.MEDIUM,
notes="需要确认资源可用性",
),
TaskItem(
id="T4",
title="生成初版执行计划",
owner=TaskOwner.AGENT,
risk=TaskRisk.LOW,
notes="含负责人和时间",
),
TaskItem(
id="T5",
title="标记需要人工确认的高风险事项",
owner=TaskOwner.HUMAN,
risk=TaskRisk.HIGH,
notes="涉及费用或权限",
),
TaskItem(
id="T6",
title="执行可自动化的准备工作",
owner=TaskOwner.AGENT,
risk=TaskRisk.LOW,
notes="文档整理、信息收集",
),
]

result = json.dumps(
[t.to_dict() for t in tasks],
ensure_ascii=False,
indent=2,
)

logger.info("目标已拆解为 %d 个任务", len(tasks))
return result


@tool
def assess_risk(task_json: str) -> str:
"""评估任务风险,返回风险说明和建议。

对每个任务标注风险等级,并说明风险原因和缓解措施。

Args:
task_json: 任务列表的 JSON 字符串

Returns:
风险评估报告
"""
logger.info("评估任务风险")

try:
tasks = json.loads(task_json)
except json.JSONDecodeError:
return "❌ 任务 JSON 无法解析,请重新生成计划。"

if not isinstance(tasks, list):
return "❌ 任务数据格式错误,应为数组。"

lines = ["⚠️ 风险评估报告", ""]

for task in tasks:
risk_str = task.get("risk", "low")
title = task.get("title", "")
task_id = task.get("id", "")

try:
risk = TaskRisk(risk_str)
except ValueError:
risk = TaskRisk.LOW

risk_emoji = {
TaskRisk.LOW: "✅",
TaskRisk.MEDIUM: "🟡",
TaskRisk.HIGH: "⚠️",
}.get(risk, "❓")

if risk == TaskRisk.HIGH:
lines.append(f"{risk_emoji} 高风险:{title} —— 需要人工确认后执行。")
lines.append(f" 原因:涉及费用/权限/不可逆操作,建议补充详细方案。")
elif risk == TaskRisk.MEDIUM:
lines.append(f"{risk_emoji} 中风险:{title} —— 建议补充依赖和负责人。")
lines.append(f" 建议:明确资源需求,确认依赖项可用。")
else:
lines.append(f"{risk_emoji} 低风险:{title} —— 可自动推进。")
lines.append("")

return "\n".join(lines)


@tool
def execute_safe_task(task_title: str) -> str:
"""执行低风险任务(模拟)。

仅用于执行标记为低风险的任务。高风险任务不应调用此工具。

Args:
task_title: 要执行的任务标题

Returns:
执行结果
"""
logger.info("执行低风险任务: %s", task_title)

# 生产环境:这里调用真实的服务或 API 执行任务
# 例如:创建文档、发送通知、更新数据库等

return (
f"✅ 已完成:{task_title}\n\n"
f"执行产出:\n"
f"- 结构化说明文档已生成\n"
f"- 相关信息已收集整理\n"
f"- 下一步建议已明确\n\n"
f"状态:已完成"
)


@tool
def create_handoff(task_title: str, reason: str) -> str:
"""为需要人工确认的任务创建交接单。

高风险任务必须通过此工具交接给人工处理,不能擅自执行。

Args:
task_title: 需要人工处理的任务标题
reason: 需要人工确认的原因

Returns:
交接单信息
"""
logger.info("创建人工交接单: %s", task_title)

import uuid

ticket = HandoffTicket(
ticket_id=f"HO-{uuid.uuid4().hex[:6].upper()}",
task_title=task_title,
reason=reason,
priority=TaskRisk.HIGH,
context="由自主任务规划 Agent 评估为高风险任务",
)

return (
f"🎫 已创建人工确认单\n\n"
f"{ticket.to_summary()}\n\n"
f"请人工确认后执行。"
)


@tool
def remember_lesson(goal: str, lesson: str, category: str = "general",
success: bool = True) -> str:
"""将本次任务复盘经验写入长期记忆。

在任务完成后调用,沉淀经验供未来参考。

Args:
goal: 相关目标
lesson: 经验内容
category: 类别 (general/technical/process/communication)
success: 是否是成功经验

Returns:
保存结果
"""
logger.info("保存经验: %s", lesson[:50])

memory = get_memory()
lesson_id = memory.remember_lesson(goal, lesson, category, success)

status = "成功经验" if success else "教训"
return f"✅ {status}已写入长期记忆 (ID: {lesson_id})。"


@tool
def get_lesson_statistics() -> str:
"""获取历史经验统计信息。

Returns:
统计报告
"""
logger.info("获取经验统计")

memory = get_memory()
stats = memory.get_statistics()

if stats["total"] == 0:
return "📊 暂无历史经验数据。"

lines = [
"📊 经验统计报告",
"",
f"总经验数: {stats['total']}",
f"成功经验: {stats['success']}",
f"失败教训: {stats['failure']}",
"",
"按类别分布:",
]

for category, count in stats["categories"].items():
lines.append(f" - {category}: {count} 条")

return "\n".join(lines)


@tool
def get_task_categories() -> str:
"""获取所有经验类别。

Returns:
类别列表
"""
memory = get_memory()
categories = memory.get_all_categories()

if not categories:
return "📂 暂无经验类别。"

lines = ["📂 经验类别:"]
for cat in categories:
lines.append(f" - {cat}")

return "\n".join(lines)


# ========== 工具列表 ==========
def get_all_tools() -> list[Any]:
"""获取所有可用工具列表。

Returns:
工具对象列表
"""
return [
recall_lessons,
decompose_goal,
assess_risk,
execute_safe_task,
create_handoff,
remember_lesson,
get_lesson_statistics,
get_task_categories,
]

memory.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
"""项目十:记忆层。

定义自主任务规划 Agent 的长期记忆系统(经验沉淀)。
"""
from __future__ import annotations

import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Any

from core.config import DATA_DIR
from core.logging_conf import get_logger

from .models import Lesson

logger = get_logger("p10.autonomous_task.memory")

DB_PATH = DATA_DIR / "autonomous_task.db"


class TaskMemory:
"""自主任务记忆系统。

存储任务执行过程中的经验教训,支持:
- 保存经验教训
- 按关键词检索历史经验
- 按类别浏览经验
- 统计分析经验数据
"""

def __init__(self, db_path: Path | None = None) -> None:
self._db_path = db_path or DB_PATH
self._init_db()

def _init_db(self) -> None:
"""初始化数据库表结构。"""
conn = sqlite3.connect(str(self._db_path))
try:
# 经验教训表
conn.execute(
"""
CREATE TABLE IF NOT EXISTS lessons (
id INTEGER PRIMARY KEY AUTOINCREMENT,
goal TEXT NOT NULL,
lesson TEXT NOT NULL,
category TEXT NOT NULL DEFAULT 'general',
success INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL
)
"""
)

# 任务记录表
conn.execute(
"""
CREATE TABLE IF NOT EXISTS task_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
goal TEXT NOT NULL,
status TEXT NOT NULL,
task_count INTEGER NOT NULL DEFAULT 0,
completed_count INTEGER NOT NULL DEFAULT 0,
handoff_count INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
completed_at TEXT,
reflection TEXT
)
"""
)

# 创建索引提升查询性能
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_lessons_goal ON lessons(goal)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_lessons_category ON lessons(category)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_lessons_keyword ON lessons(lesson)"
)

conn.commit()
logger.info("任务记忆系统初始化完成: %s", self._db_path)
finally:
conn.close()

def remember_lesson(self, goal: str, lesson: str, category: str = "general",
success: bool = True) -> int:
"""保存经验教训。

Args:
goal: 相关目标
lesson: 经验内容
category: 类别 (general/technical/process/communication)
success: 是否是成功经验

Returns:
记录 ID
"""
conn = sqlite3.connect(str(self._db_path))
try:
cursor = conn.execute(
"""
INSERT INTO lessons (goal, lesson, category, success, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(
goal,
lesson,
category,
1 if success else 0,
datetime.now().isoformat(),
),
)
conn.commit()
lesson_id = cursor.lastrowid or 0
logger.info("保存经验 #%d: %s", lesson_id, lesson[:50])
return lesson_id
finally:
conn.close()

def recall_lessons(self, keyword: str, limit: int = 5) -> list[Lesson]:
"""检索历史经验。

Args:
keyword: 搜索关键词
limit: 返回数量限制

Returns:
经验列表
"""
conn = sqlite3.connect(str(self._db_path))
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"""
SELECT goal, lesson, category, success, created_at
FROM lessons
WHERE goal LIKE ? OR lesson LIKE ?
ORDER BY id DESC
LIMIT ?
""",
(f"%{keyword}%", f"%{keyword}%", limit),
).fetchall()

return [
Lesson(
goal=row["goal"],
lesson=row["lesson"],
category=row["category"],
success=bool(row["success"]),
created_at=row["created_at"],
)
for row in rows
]
finally:
conn.close()

def get_lessons_by_category(self, category: str, limit: int = 10) -> list[Lesson]:
"""按类别获取经验。

Args:
category: 类别
limit: 返回数量限制

Returns:
经验列表
"""
conn = sqlite3.connect(str(self._db_path))
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"""
SELECT goal, lesson, category, success, created_at
FROM lessons
WHERE category = ?
ORDER BY id DESC
LIMIT ?
""",
(category, limit),
).fetchall()

return [
Lesson(
goal=row["goal"],
lesson=row["lesson"],
category=row["category"],
success=bool(row["success"]),
created_at=row["created_at"],
)
for row in rows
]
finally:
conn.close()

def get_all_categories(self) -> list[str]:
"""获取所有经验类别。

Returns:
类别列表
"""
conn = sqlite3.connect(str(self._db_path))
try:
rows = conn.execute(
"SELECT DISTINCT category FROM lessons ORDER BY category"
).fetchall()
return [row[0] for row in rows]
finally:
conn.close()

def get_statistics(self) -> dict[str, Any]:
"""获取经验统计信息。

Returns:
统计数据
"""
conn = sqlite3.connect(str(self._db_path))
try:
total = conn.execute("SELECT COUNT(*) FROM lessons").fetchone()[0]
success_count = conn.execute(
"SELECT COUNT(*) FROM lessons WHERE success = 1"
).fetchone()[0]
failure_count = total - success_count

# 按类别统计
category_rows = conn.execute(
"""
SELECT category, COUNT(*) as count
FROM lessons
GROUP BY category
ORDER BY count DESC
"""
).fetchall()
categories = {row[0]: row[1] for row in category_rows}

return {
"total": total,
"success": success_count,
"failure": failure_count,
"categories": categories,
}
finally:
conn.close()

def save_task_record(self, task_id: str, goal: str, status: str,
task_count: int, completed_count: int, handoff_count: int,
reflection: str = "") -> None:
"""保存任务执行记录。

Args:
task_id: 任务 ID
goal: 目标
status: 最终状态
task_count: 总任务数
completed_count: 已完成数
handoff_count: 交接数
reflection: 复盘内容
"""
conn = sqlite3.connect(str(self._db_path))
try:
conn.execute(
"""
INSERT INTO task_records
(task_id, goal, status, task_count, completed_count,
handoff_count, created_at, completed_at, reflection)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
task_id,
goal,
status,
task_count,
completed_count,
handoff_count,
datetime.now().isoformat(),
datetime.now().isoformat(),
reflection,
),
)
conn.commit()
logger.info("保存任务记录: %s", task_id)
finally:
conn.close()


# 全局记忆实例
_memory: TaskMemory | None = None


def get_memory() -> TaskMemory:
"""获取任务记忆系统实例。

Returns:
TaskMemory 单例
"""
global _memory
if _memory is None:
_memory = TaskMemory()
return _memory

service.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
"""项目十:服务层。

封装自主任务规划 Agent 的核心业务逻辑。
"""
from __future__ import annotations

from typing import Any

from langchain.agents import create_agent

from core import build_chat_model
from core.logging_conf import get_logger

from .memory import get_memory
from .models import (
PlanningTask,
TaskItem,
TaskOwner,
TaskRisk,
TaskStatus,
)
from .prompts import AUTONOMOUS_AGENT_PROMPT
from .tools import get_all_tools

logger = get_logger("p10.autonomous_task.service")


class AutonomousTaskService:
"""自主任务规划服务类。

封装自主任务规划的核心业务逻辑:
- 规划-执行-反思闭环 (Plan → Execute → Reflect → Revise)
- 任务状态机管理
- 风险评估和安全执行
- 经验沉淀到长期记忆
"""

def __init__(self) -> None:
self._agent: Any | None = None
self._memory = get_memory()
self._tasks: dict[str, PlanningTask] = {}

def build_agent(self) -> Any:
"""构建自主任务规划 Agent。

Returns:
LangChain Agent 对象
"""
if self._agent is not None:
return self._agent

self._agent = create_agent(
model=build_chat_model(),
tools=get_all_tools(),
system_prompt=AUTONOMOUS_AGENT_PROMPT,
)
return self._agent

def create_planning_task(self, goal: str, task_id: str | None = None) -> PlanningTask:
"""创建规划任务。

Args:
goal: 任务目标
task_id: 可选任务 ID

Returns:
规划任务对象
"""
import uuid

if task_id is None:
task_id = f"plan_{uuid.uuid4().hex[:8]}"

task = PlanningTask(
task_id=task_id,
goal=goal,
status=TaskStatus.TODO,
)
self._tasks[task_id] = task
task.log(f"创建规划任务: {goal}")
logger.info("创建规划任务: %s", task_id)
return task

def get_task(self, task_id: str) -> PlanningTask | None:
"""获取规划任务。

Args:
task_id: 任务 ID

Returns:
规划任务对象或 None
"""
return self._tasks.get(task_id)

async def run_planning_phase(self, task: PlanningTask) -> bool:
"""运行规划阶段。

Args:
task: 规划任务

Returns:
是否成功
"""
task.status = TaskStatus.IN_PROGRESS
task.log("开始规划阶段")

logger.info("[%s] 规划阶段开始", task.task_id)

from .tools import decompose_goal

try:
result_json = decompose_goal.invoke({"goal": task.goal})

import json
tasks_data = json.loads(result_json)

for task_data in tasks_data:
task_item = TaskItem(
id=task_data["id"],
title=task_data["title"],
owner=TaskOwner(task_data.get("owner", "agent")),
status=TaskStatus.TODO,
risk=TaskRisk(task_data.get("risk", "low")),
notes=task_data.get("notes", ""),
)
task.add_task_item(task_item)

task.log(f"规划完成,共 {len(task.plan)} 个任务")
logger.info("规划完成,共 %d 个任务", len(task.plan))
return len(task.plan) > 0
except Exception as e:
task.log(f"规划阶段失败: {e}")
logger.error("规划阶段失败: %s", e)
return False

async def run_risk_assessment_phase(self, task: PlanningTask) -> bool:
"""运行风险评估阶段。

Args:
task: 规划任务

Returns:
是否成功
"""
task.log("开始风险评估阶段")
logger.info("[%s] 风险评估阶段开始", task.task_id)

from .tools import assess_risk

try:
import json

tasks_json = json.dumps(
[t.to_dict() for t in task.plan],
ensure_ascii=False,
)

result = assess_risk.invoke({"task_json": tasks_json})
task.log("风险评估完成")

# 统计高风险任务
high_risk_count = len(task.get_high_risk_tasks())
if high_risk_count > 0:
task.log(f"发现 {high_risk_count} 个高风险任务,需人工确认")

return True
except Exception as e:
task.log(f"风险评估失败: {e}")
logger.error("风险评估失败: %s", e)
return False

async def run_execution_phase(self, task: PlanningTask) -> None:
"""运行执行阶段。

Args:
task: 规划任务
"""
task.log("开始执行阶段")
logger.info("[%s] 执行阶段开始", task.task_id)

from .tools import create_handoff, execute_safe_task

# 执行安全任务
safe_tasks = task.get_safe_tasks()
for task_item in safe_tasks:
task.log(f"执行低风险任务: {task_item.title}")
try:
result = execute_safe_task.invoke({"task_title": task_item.title})
task_item.status = TaskStatus.COMPLETED
task.log(f"任务完成: {task_item.title}")
except Exception as e:
task.log(f"任务执行失败: {task_item.title} - {e}")
task_item.status = TaskStatus.BLOCKED

# 高风险任务创建交接单
high_risk_tasks = task.get_high_risk_tasks()
for task_item in high_risk_tasks:
task.log(f"创建交接单: {task_item.title}")
try:
reason = task_item.notes or "高风险任务,需人工确认"
create_handoff.invoke({
"task_title": task_item.title,
"reason": reason,
})
task_item.status = TaskStatus.HANDED_OFF
task.log(f"已交接: {task_item.title}")
except Exception as e:
task.log(f"交接失败: {task_item.title} - {e}")

completed_count = sum(1 for t in task.plan if t.status == TaskStatus.COMPLETED)
handoff_count = sum(1 for t in task.plan if t.status == TaskStatus.HANDED_OFF)
task.log(f"执行阶段完成: 完成 {completed_count} 个,交接 {handoff_count} 个")

async def run_reflection_phase(self, task: PlanningTask) -> str:
"""运行反思阶段。

Args:
task: 规划任务

Returns:
反思报告
"""
task.log("开始反思阶段")
logger.info("[%s] 反思阶段开始", task.task_id)

# 生成反思报告
completed_tasks = [t for t in task.plan if t.status == TaskStatus.COMPLETED]
handed_off_tasks = [t for t in task.plan if t.status == TaskStatus.HANDED_OFF]

reflection = f"""# 任务复盘报告

## 目标
{task.goal}

## 执行情况
- 总任务数: {len(task.plan)}
- 已完成: {len(completed_tasks)}
- 已交接: {len(handed_off_tasks)}

## 已完成的任务
"""
for t in completed_tasks:
reflection += f"- ✅ {t.title}\n"

reflection += "\n## 已交接的任务\n"
for t in handed_off_tasks:
reflection += f"- 🎫 {t.title} (原因: {t.notes})\n"

reflection += f"""
## 经验总结
- 规划阶段应该更充分地考虑依赖关系
- 高风险任务应该提前识别并准备好人工确认流程
- 执行过程要保持日志记录,便于复盘

## 改进措施
- 增加任务依赖关系的可视化
- 优化风险评估的维度
- 建立标准化的交接流程
"""

# 保存经验到长期记忆
from .tools import remember_lesson

remember_lesson.invoke({
"goal": task.goal,
"lesson": f"完成 {len(completed_tasks)}/{len(task.plan)} 个任务,"
f"交接 {len(handed_off_tasks)} 个高风险任务",
"category": "process",
"success": len(completed_tasks) > 0,
})

# 保存任务记录
self._memory.save_task_record(
task_id=task.task_id,
goal=task.goal,
status=task.status.value,
task_count=len(task.plan),
completed_count=len(completed_tasks),
handoff_count=len(handed_off_tasks),
reflection=reflection,
)

task.status = TaskStatus.COMPLETED
task.log("反思阶段完成,任务结束")

return reflection

async def run_full_workflow(self, goal: str) -> str:
"""运行完整的规划-执行-反思工作流。

Args:
goal: 任务目标

Returns:
最终报告
"""
task = self.create_planning_task(goal)

# 阶段 1: 规划
success = await self.run_planning_phase(task)
if not success:
return "❌ 规划阶段失败,请重试"

# 阶段 2: 风险评估
await self.run_risk_assessment_phase(task)

# 阶段 3: 执行
await self.run_execution_phase(task)

# 阶段 4: 反思
reflection = await self.run_reflection_phase(task)

# 生成最终报告
return self.generate_final_report(task, reflection)

def generate_final_report(self, task: PlanningTask, reflection: str) -> str:
"""生成最终报告。

Args:
task: 规划任务
reflection: 反思内容

Returns:
最终报告
"""
completed = [t for t in task.plan if t.status == TaskStatus.COMPLETED]
handed_off = [t for t in task.plan if t.status == TaskStatus.HANDED_OFF]
blocked = [t for t in task.plan if t.status == TaskStatus.BLOCKED]

report = f"""# 自主任务规划报告

## 目标
{task.goal}

## 任务概览
- 总任务数: {len(task.plan)}
- ✅ 已完成: {len(completed)}
- 🎫 已交接: {len(handed_off)}
- ⛔ 已阻塞: {len(blocked)}

## 执行日志
"""
for log_entry in task.execution_log[-10:]: # 最近 10 条
report += f"- {log_entry}\n"

report += f"\n{reflection}"

return report

def get_task_history(self, limit: int = 20) -> list[PlanningTask]:
"""获取任务历史。

Args:
limit: 返回数量限制

Returns:
任务列表
"""
return list(self._tasks.values())[-limit:]


# 全局服务实例
_service: AutonomousTaskService | None = None


def get_service() -> AutonomousTaskService:
"""获取自主任务规划服务实例。

Returns:
AutonomousTaskService 单例
"""
global _service
if _service is None:
_service = AutonomousTaskService()
return _service

project.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"""项目十:项目定义层。

定义自主任务规划 Agent 的项目注册和对外接口。
"""
from __future__ import annotations

from typing import Any

from core import BaseProject, registry

from .service import get_service
from .tools import get_all_tools


class AutonomousTaskProject(BaseProject):
"""自主任务规划 Agent(全能力集大成)。

主讲能力:任务规划 + 反思 + 记忆 + Tool + 人工交接协作

业务场景:用户给出一个相对模糊的复杂目标,例如"帮我准备一次产品发布",
Agent 自动拆解任务、评估风险、执行可自动化的步骤、沉淀经验,并输出可执行计划。

生产级特性:
- 规划-执行-反思闭环:Plan → Execute → Reflect → Revise
- 任务状态机:待办/执行中/已完成/阻塞/已交接
- 风险评估:对高风险任务标记为需要人工确认
- 经验沉淀:把复盘结论写入长期记忆
- 可观测输出:每一步都有结构化日志与状态摘要
"""

id = "p10_autonomous_task"
name = "自主任务规划 Agent"
description = "从模糊目标到可执行计划:规划、执行、反思、沉淀经验。"
capabilities = ["Planning", "Reflection", "Memory", "Tool", "Autonomy"]

def build_agent(self) -> Any:
"""构建自主任务规划 Agent 实例。

Returns:
LangChain Agent 对象
"""
service = get_service()
return service.build_agent()

def run(self, message: str) -> str:
"""运行规划任务。

Args:
message: 用户输入的目标描述

Returns:
规划报告
"""
import asyncio

service = get_service()

try:
# 尝试运行完整工作流
loop = asyncio.get_event_loop_policy().get_event_loop()
if loop.is_running():
# 如果已有事件循环(如在服务中),使用 Agent 直接运行
agent = self.build_agent()
return agent.run(message)
else:
# 否则异步运行完整工作流
return asyncio.run(service.run_full_workflow(message))
except Exception as e:
# 降级方案:直接用 Agent 运行
try:
agent = self.build_agent()
return agent.run(message)
except Exception as e2:
return f"❌ 任务规划失败: {e2}"


# 项目实例
project = AutonomousTaskProject()

# 对外暴露的快捷函数
run = project.run
build_agent = project.build_agent
get_tools = get_all_tools

__all__ = [
"AutonomousTaskProject",
"project",
"run",
"build_agent",
"get_tools",
]

init.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""项目十:自主任务规划 Agent(全能力集大成)

主讲能力:任务规划 + 反思 + 记忆 + Tool + 人工交接协作

业务场景:用户给出一个相对模糊的复杂目标,例如"帮我准备一次产品发布",
Agent 自动拆解任务、评估风险、执行可自动化的步骤、沉淀经验,并输出可执行计划。

生产级特性:
- 规划-执行-反思闭环:Plan → Execute → Reflect → Revise
- 任务状态机:待办/执行中/已完成/阻塞/已交接
- 风险评估:对高风险任务标记为需要人工确认
- 经验沉淀:把复盘结论写入长期记忆
- 可观测输出:每一步都有结构化日志与状态摘要

架构分层:
- models.py: 数据模型、任务状态机
- prompts.py: 系统提示词、模板
- tools.py: 规划/执行/反思工具
- memory.py: 经验沉淀记忆系统
- service.py: 业务逻辑、闭环编排
- project.py: 项目定义、对外接口
"""
from __future__ import annotations

from core import registry

# 导出 models 层
from .models import (
HandoffTicket,
Lesson,
PlanningTask,
PlannerState,
ReflectionResult,
RiskAssessment,
TaskItem,
TaskOwner,
TaskRisk,
TaskStatus,
)

# 导出 prompts 层
from .prompts import (
AUTONOMOUS_AGENT_PROMPT,
DECOMPOSE_PROMPT,
EXECUTION_PROMPT,
HANDOFF_PROMPT,
REFLECTION_PROMPT,
RISK_ASSESSMENT_PROMPT,
)

# 导出 memory 层
from .memory import TaskMemory, get_memory

# 导出 tools 层(保持向后兼容)
from .tools import (
assess_risk,
create_handoff,
decompose_goal,
execute_safe_task,
get_all_tools,
get_lesson_statistics,
get_task_categories,
recall_lessons,
remember_lesson,
)

# 导出 service 层
from .service import AutonomousTaskService, get_service

# 导出 project 层
from .project import (
AutonomousTaskProject,
build_agent,
get_tools,
project,
run,
)

# 向后兼容别名:原单文件版本使用 SYSTEM_PROMPT
SYSTEM_PROMPT = AUTONOMOUS_AGENT_PROMPT

# 注册项目到全局注册表
registry.register(project)

__all__ = [
# models
"TaskStatus",
"TaskRisk",
"TaskOwner",
"TaskItem",
"RiskAssessment",
"HandoffTicket",
"Lesson",
"ReflectionResult",
"PlannerState",
"PlanningTask",
# prompts
"AUTONOMOUS_AGENT_PROMPT",
"SYSTEM_PROMPT",
"DECOMPOSE_PROMPT",
"RISK_ASSESSMENT_PROMPT",
"EXECUTION_PROMPT",
"REFLECTION_PROMPT",
"HANDOFF_PROMPT",
# memory
"TaskMemory",
"get_memory",
# tools
"recall_lessons",
"decompose_goal",
"assess_risk",
"execute_safe_task",
"create_handoff",
"remember_lesson",
"get_lesson_statistics",
"get_task_categories",
"get_all_tools",
# service
"AutonomousTaskService",
"get_service",
# project
"AutonomousTaskProject",
"project",
"run",
"build_agent",
"get_tools",
]

12.4.2 核心代码讲解

1. 规划-执行-反思闭环(Plan → Execute → Reflect)

整个 Agent 的心脏,是服务层 AutonomousTaskService.run_full_workflow 串起的四阶段闭环:先 run_planning_phase 把模糊目标拆成结构化任务,再 run_risk_assessment_phase 给每个任务打风险标签,接着 run_execution_phase 自动推进低风险任务、把高风险任务挂起交接,最后 run_reflection_phase 复盘并沉淀经验。注意,这四个阶段不是"可选菜单",而是强制顺序——规划失败就直接返回错误,绝不允许带着一份空计划硬往下执行。反思阶段产出的复盘报告会拼进最终报告交回给用户,形成"做完一次任务、长一次经验"的正循环。

2. 任务状态机与风险评估

TaskStatus 定义了 todo / in_progress / completed / blocked / handed_off 五种状态,TaskRisk 定义了 low / medium / high 三级风险。TaskItem.is_safe_to_execute 用三条规则(owner 是 agent、risk 是 low、status 是 todo)判断任务能否自动执行;needs_human_confirmation 则在 risk 为 high 或 owner 为 human 时触发人工确认。这种"状态 + 风险"的双重约束,正是把"能自动化的"和"应该自动执行的"分开的关键——只有可逆、无外部影响、不涉及费用的操作,才允许 Agent 自己动手。

3. 高风险任务的人工交接机制

执行阶段对高风险任务调用 create_handoff 生成 HandoffTicket,并把任务状态置为 handed_off,绝不擅自执行。交接单里带着任务标题、风险原因、优先级和上下文,相当于给人工递过去一张"待办工单"。AUTONOMOUS_AGENT_PROMPT 里也白纸黑字写着"高风险任务必须 create_handoff,不能擅自执行"——Prompt 和代码两道闸门一起守,这就是 12.3.2 那条"能自动化 ≠ 该自动执行"原则的工程落地。

4. 经验沉淀到长期记忆

记忆层 TaskMemory 用 SQLite 存了一张 lessons 表,remember_lesson 写入、recall_lessons 按 goal/lesson 关键词 LIKE 检索。反思阶段会自动把"完成几个任务、交接几个高风险任务"作为一条 process 类经验写进去;下次类似目标开始时,recall_lessons 工具会先把历史经验喂给 Agent,让它像老员工翻备忘录一样避开旧坑。这就是"越用越懂你"的工程落点:经验不是一次性的结论,而是跨会话累积的资产。

⚠️ 避坑(记忆治理):经验库会越攒越大,时间一长,过期经验会变成噪音甚至误导——半年前"场地要先锁"的经验,放到今年改成线上发布会就不适用了。生产环境里别只管写入不管清理:要给经验加时间衰减、支持用户删除与导出,检索时优先返回近期、高相关的条目。一个无人治理的记忆库,最终会从"资产"退化成"负债"。

5. 服务层的异步工作流编排

四个阶段方法都是 async defrun_full_workflowawait 顺序编排,为将来接入真实异步工具(调外部 API、写消息队列)留好了口子。PlanningTask 作为可变状态对象在各阶段之间传递,每个阶段往 execution_log 追加带时间戳的日志,最后 generate_final_report 取最近 10 条日志拼进报告,做到过程可观测——出问题时,日志就是排查的第一现场。

6. 错误降级处理

每一层都给自己留了退路:规划阶段 decompose_goal 解析失败,会把异常写进日志并返回 False,整个流程提前终止;执行阶段单个任务失败,只把这一个任务置为 blocked,不影响其他任务继续跑;project.pyrun 方法先尝试跑完整异步工作流,捕获异常后再降级到直接调用 agent.run,再失败才返回错误文案。这种"逐层兜底"的设计,保证 Agent 不会因为单点故障就整体崩盘。

7. 三层架构的解耦优势

模型层只定义数据结构和状态机,不依赖任何外部服务;提示词层是纯字符串常量,可独立审阅和迭代;工具层用 @tool 装饰器把能力暴露给 Agent,内部依赖模型层和记忆层;服务层编排工具却不关心工具怎么实现;项目层只管注册和对外接口。这种分层让每一层都能单独测试和替换——比如把 decompose_goal 从示例数据换成真实 LLM 生成,只动工具层,模型层和服务层一行都不用改。__init__.py 统一 re-export 并注册到全局 registry,对外只暴露 run / build_agent / get_tools,调用方完全无需关心内部分层。

💡 顿悟时刻:回看全书,从第 1 章只会回答问题的"单兵",到现在能规划、能判断风险、知道该请示谁、还会把经验存下来的"自主 Agent"——这条主线的本质,是 Agent 一步步长出了"边界感"和"成长性"。会干活只是及格,懂边界、能积累,才是一个 Agent 从"工具"走向"队友"的分水岭。


12.5 跑一跑:它真的行吗

离线测试要回答的核心问题是:这台 Agent 的"边界感"到底靠不靠谱?所以用例专门盯着风险分流和记忆读写:

  • 任务拆解返回合法 JSON
  • 风险评估能识别高低风险
  • 低风险执行工具返回完成信息
  • 高风险交接工具返回人工确认单
  • 经验记忆可写入和检索

换句话说,测的就是"该自动的自动了没、该交接的拦住了没、经验存进去还能不能捞回来"。这三道关卡守住,Agent 才敢放心上岗。

运行:

1
2
cd backend
pytest projects/p10_autonomous_task/test_agent.py -v

12.6 送上线:让它上班

前端选择「自主任务规划 Agent」,输入:

帮我规划一次新产品发布会。

你会看到 Agent 先检索历史经验,再拆任务、评风险、自动推进低风险项、创建人工交接单,并输出最终报告——活脱脱一个有边界感、会复盘的"AI 项目助理"在屏幕里跑完整套流程。

生产环境中,这套东西要真敢上,还有几条规矩必须立:

  • 所有高风险交接单进入工单系统,不能只活在日志里。
  • 自动执行任务必须有审计日志,谁干的、干了什么、什么时候干的,可追溯。
  • 长期记忆支持用户删除与导出——经验库是用户的资产,处置权得交还用户。
  • 对外部动作配置白名单,没在白名单里的操作,Agent 再"能"也不许碰。

⚠️ 避坑:这几条看着像行政要求,其实条条都是血泪。交接单不进工单系统,人工就会漏处理;没有审计日志,出了事连谁都查不清;记忆不让删,不仅违规还埋雷;外部动作不设白名单,等于把家门钥匙挂门外。自主权越大的 Agent,这几条越不是可选项。


12.7 回头看:学到了什么

自主任务规划 Agent 是本书十个项目的集大成。它把前面练过的所有本事,一股脑儿串成了一条完整的能力链:

能力 在本章中的体现
Prompt 流程约束型 System Prompt
Tool 六个任务规划工具
Harness create_agent 内部循环
记忆 历史经验检索与沉淀
安全 高风险任务人工交接
反思 任务结束后复盘写入经验库

💡 诚实交代一句:本章是单个自主 Agent 加上"Agent↔人工"的交接协作,capabilities = ["Planning", "Reflection", "Memory", "Tool", "Autonomy"] 里没有 Multi-Agent。这里的"协作",指的是 Agent 懂得把干不了的事交回给人,而不是多个 Agent 之间的群体协同。真正的多 Agent 编队,请看第 9、10 章的内容创作与研究分析平台。把"自主"和"多智能体"分清楚,才不会高估了它的能力边界。

至此,十个项目的核心实战全部完成。从第一章那个只会回答问题的单兵,到今天这个能拆目标、判风险、懂交接、会复盘、还长记性的自主 Agent——这条"从单兵到团队、从工具到队友"的主线,已经走完最关键的一程。后续章节将回到工程层面,进一步讲统一前端管理台和生产级最佳实践。


如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !